package com.javaDemo.shouxierpc.provider;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ProviderReflect {
    private static final ExecutorService executorService = Executors.newSingleThreadExecutor();
    // ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
    // ScheduledExecutorService executorServices = new ScheduledThreadPoolExecutor(1,new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());

    // 线程池不允许使用Executors去创建，而是通过ThreadPoolExecutor的方式，这样的处理方式让写的同学更加明确线程池的运行规则，规避资源耗尽的风险。 说明：Executors返回的线程池对象的弊端如下：
    //         1）FixedThreadPool和SingleThreadPool:
    //           允许的请求队列长度为Integer.MAX_VALUE，可能会堆积大量的请求，从而导致OOM。
    //         2）CachedThreadPool:
    //           允许的创建线程数量为Integer.MAX_VALUE，可能会创建大量的线程，从而导致OOM。
    //
    // Positive example 1：
    // //org.apache.commons.lang3.concurrent.BasicThreadFactory
    // ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
    //         new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
    //
    //
    //
    // Positive example 2：
    // ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
    //         .setNameFormat("demo-pool-%d").build();
    //
    // //Common Thread Pool
    // ExecutorService pool = new ThreadPoolExecutor(5, 200,
    //         0L, TimeUnit.MILLISECONDS,
    //         new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
    //
    // pool.execute(()-> System.out.println(Thread.currentThread().getName()));
    // pool.shutdown();//gracefully shutdown
    //
    //
    //
    // Positive example 3：
    // <bean id="userThreadPool"
    // class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    //     <property name="corePoolSize" value="10" />
    //     <property name="maxPoolSize" value="100" />
    //     <property name="queueCapacity" value="2000" />
    //
    // <property name="threadFactory" value= threadFactory />
    //     <property name="rejectedExecutionHandler">
    //         <ref local="rejectedExecutionHandler" />
    //     </property>
    // </bean>
    //         //in code
    //         userThreadPool.execute(thread);

    ThreadFactory threadFactory=new ThreadFactoryBuilder().build();
            ExecutorService service=new ThreadPoolExecutor(5,10,1, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory,new ThreadPoolExecutor.AbortPolicy());
    /**
     * RPC监听和远程方法调用
     * @param service RPC远程方法调用的接口实例
     * @param port 监听的端口
     * @throws Exception
     */
    public static void provider(final Object service,int port) throws Exception {
        //创建服务端的套接字，绑定端口port
        ServerSocket serverSocket = new ServerSocket(port);
        while (true) {
            //开始接收客户端的消息，并以此创建套接字
            final Socket socket = serverSocket.accept();
            //多线程执行，这里的问题是连接数过大，线程池的线程数会耗尽
            executorService.execute(() -> {
                try {
                    //创建呢一个对内传输的对象流，并绑定套接字
                    ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                    try {
                        try {
                            //从对象流中读取接口方法的方法名
                            String methodName = input.readUTF();
                            //从对象流中读取接口方法的所有参数
                            Object[] args = (Object[]) input.readObject();
                            Class[] argsTypes = new Class[args.length];
                            for (int i = 0;i < args.length;i++) {
                                argsTypes[i] = args[i].getClass();

                            }
                            //创建一个对外传输的对象流，并绑定套接字
                            //这里是为了将反射执行结果传递回消费者端
                            ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                            try {
                                Class<?>[] interfaces = service.getClass().getInterfaces();
                                Method method = null;
                                for (int i = 0;i < interfaces.length;i++) {
                                    method = interfaces[i].getDeclaredMethod(methodName,argsTypes);
                                    if (method != null) {
                                        break;
                                    }
                                }
                                Object result = method.invoke(service, args);
                                //将反射执行结果写入对外传输的对象流中
                                output.writeObject(result);
                            } catch (Throwable t) {
                                output.writeObject(t);
                            } finally {
                                output.close();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            input.close();
                        }
                    } finally {
                        socket.close();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
}